<!DOCTYPE html>
<html lang="en">
  <head>
    <meta charset="utf-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <meta name="description" content="">
    <meta name="author" content="">
    <meta name="keyword" content="">
    <link rel="shortcut icon" href="/node-mongodb-native/img/favicon.png">

    <title>Queues</title>

    <link href="/node-mongodb-native/css/bootstrap-theme.css" rel="stylesheet">
    <link href="/node-mongodb-native/assets/font-awesome/css/font-awesome.min.css" rel="stylesheet" />
    <link href="/node-mongodb-native/css/style.css" rel="stylesheet">
    <link href="/node-mongodb-native/css/style-responsive.css" rel="stylesheet" />
    <link href="/node-mongodb-native/css/monokai_sublime.css" rel="stylesheet" />

  </head>

  <body>

  <section id="container" class="">


      <header class="header black-bg">
            <div class="toggle-nav">
                <i class="fa fa-bars"></i>
                <div class="icon-reorder tooltips" data-original-title="Toggle Navigation" data-placement="bottom"></div>
            </div>


            <a href="/node-mongodb-native/" class="logo"><img src="/node-mongodb-native/img/logo-mongodb-header.png" style="height:40px;"></a>

            <div class="nav title-row" id="top_menu">
                <h1 class="nav top-menu"> Queues </h1>
            </div>
      </header>



<aside>
    <div id="sidebar"  class="nav-collapse ">

        <ul class="sidebar-menu">




            <li class="sub-menu active">
            <a href="javascript:;" class="">
                <i class='fa fa-book'></i>

                <span>schema design</span>
                <span class="menu-arrow fa fa-angle-down"></span>
            </a>
                <ul class="sub open">

                    <li><a href="/node-mongodb-native/schema/chapter1/"> Introduction </a> </li>

                    <li><a href="/node-mongodb-native/schema/chapter2/"> Schema Basics </a> </li>

                    <li><a href="/node-mongodb-native/schema/chapter3/"> MongoDB Storage </a> </li>

                    <li><a href="/node-mongodb-native/schema/chapter4/"> Indexes </a> </li>

                    <li><a href="/node-mongodb-native/schema/chapter5/"> Metadata </a> </li>

                    <li><a href="/node-mongodb-native/schema/chapter6/"> Time Series </a> </li>

                    <li class="active"><a href="/node-mongodb-native/schema/chapter7/"> Queues </a> </li>

                    <li><a href="/node-mongodb-native/schema/chapter8/"> Nested Categories </a> </li>

                    <li><a href="/node-mongodb-native/schema/chapter9/"> Account Transactions </a> </li>

                    <li><a href="/node-mongodb-native/schema/chapter10/"> Shopping Cart </a> </li>

                    <li><a href="/node-mongodb-native/schema/chapter11/"> Sharding </a> </li>

                </ul>

              </li>


                <li>
                <a class="" href="https://mongodb.github.io/node-mongodb-native/2.2">
                    <i class='fa fa-file-text'></i>

                    <span>2.0 driver docs</span>
                </a>

              </li>


                <li>
                <a class="" href="https://mongodb.github.io/node-mongodb-native/2.2">
                    <i class='fa fa-file-text'></i>

                    <span>1.4 driver docs</span>
                </a>

              </li>


                <li>
                <a class="" href="https://mongodb.github.io/node-mongodb-native/2.2">
                    <i class='fa fa-file-text'></i>

                    <span>Core driver docs</span>
                </a>

              </li>

            <li> <a href="https://groups.google.com/forum/#!forum/learn-mongo-db-the-hardway" target="blank"><i class='fa fa-life-ring'></i>Issues & Help</a> </li>


        </ul>

    </div>
</aside>




      <section id="main-content">
          <section class="wrapper">













              <div class="row">
                  <div class="col-md-1">


                      <a class="navigation prev" href="/schema/chapter6">
                      <i class="fa fa-angle-left"></i>
                      </a>


                  </div>
                <div class="col-md-10">
                    <section class="panel">



                    <div class="panel-body">



<h1 id="queues:350c6b49ae8b36a26012060b0876f863">Queues</h1>

<p>A queue lets multiple publishers push messages to it and multiple subscribers can pull messages off it. The message is only delivered to a single subscriber.</p>

<p><img src="/images/originals/queue.png" alt="A Queue With Publishers and Subscribers" />
</p>

<p>A variant of a queue is called a topic and the difference with a queue is that all subscribers receive all messages. In MongoDB the classic example is the <strong>oplog</strong> that contains all operations performed on the <strong>master</strong> database and that all <strong>secondaries</strong> listen to for changes.</p>

<p>We are going to look at both examples and how to implement them using MongoDB.</p>

<h2 id="work-queue:350c6b49ae8b36a26012060b0876f863">Work Queue</h2>

<p>The work queue will contain messages describing work to be performed asynchronously. In our example this will be to process images that have been uploaded.</p>

<p>The document describing a job looks like and we are inserting it into our queue collections.</p>

<pre><code class="language-js">var col = db.getSisterDB(&quot;work&quot;).queue;
col.insert({
  &quot;input&quot;: &quot;/img/images.png&quot;,
  &quot;output&quot;: &quot;/converted/images.jpg&quot;
});
</code></pre>

<p>The <strong>ObjectId</strong> that is added to all documents as the <strong>_id</strong> field if not overridden contains a timestamp allowing us to use it to sort by time. To fetch the next document in a <em>FIFO</em> (First In First Out) manner do.</p>

<pre><code class="language-js">var col = db.getSisterDB(&quot;work&quot;).queue;
var job = col.findAndModify({
    query: {}
  , sort: {_id: 1}
  , remove: true
});
</code></pre>

<p>This will sort the jobs in <em>ascending order</em> by <strong>_id</strong> and remove and return the first one. Since <strong>findAndModify</strong> is an <em>atomic</em> operation it guarantees that only a single subscriber receives the message.</p>

<h3 id="work-queue-with-priorities-and-timestamps:350c6b49ae8b36a26012060b0876f863">Work Queue With Priorities and Timestamps</h3>

<p>We can extend the work queue very easily to allow for priorities and statistics by extending the work document to include a couple of more fields.</p>

<pre><code class="language-js">var col = db.getSisterDB(&quot;work&quot;).queue;
col.insert({
  &quot;priority&quot;: 1,
  &quot;input&quot;: &quot;/img/images.png&quot;,
  &quot;output&quot;: &quot;/converted/images.jpg&quot;,
  &quot;start_time&quot;: null,
  &quot;end_time&quot;: null,
});
</code></pre>

<p>In the previous example we are consuming the actual work item by removing it from the collection but here we want to keep it around for reporting purposes. Let&rsquo;s grab the highest priority work item.</p>

<pre><code class="language-js">var col = db.getSisterDB(&quot;work&quot;).queue;
var job = col.findAndModify({
    query: {start_time: null}
  , sort: {priority: -1}
  , update: {$set: {start_time: new Date()}}
  , new: true
});
</code></pre>

<p>When we are done we can set the end time for the job using a simple update.</p>

<pre><code class="language-js">var col = db.getSisterDB(&quot;work&quot;).queue;
col.update({_id: job._id}, {$set: {end_time: new Date()}});
</code></pre>

<h2 id="stock-ticker-topic:350c6b49ae8b36a26012060b0876f863">Stock Ticker Topic</h2>

<p>The stock ticker topic allows multiple applications to listen to a live stream of data about stock price. For the topic we want to ensure maximum throughput. We can achieve this by using a special type of collection in MongoDB called a <strong>capped collection</strong>.</p>

<p>A <strong>capped collection</strong> is basically what we call a ring buffer meaning they have a fixed size. Once the application goes over the size it starts overwriting documents.</p>

<p><img src="/images/originals/ring_buffer.png" alt="A Ring Buffer" />
</p>

<p>The benefit of the <strong>capped collection</strong> is that it allows for <strong>tailing</strong> meaning applications can listen to new documents being inserted. Let&rsquo;s set up our stock ticker schema.</p>

<pre><code class="language-json">{
  &quot;time&quot;: ISODate(&quot;2014-01-01T10:01:22Z&quot;)
  &quot;ticker&quot;: &quot;PIP&quot;,
  &quot;price&quot;: &quot;4.45&quot;
}
</code></pre>

<p>Create a new <strong>capped collection</strong></p>

<pre><code class="language-js">var db = db.getSisterDB(&quot;finance&quot;);
db.createCollection(&quot;ticker&quot;, {capped:true, size:100000})
</code></pre>

<p>Let&rsquo;s boot up a new shell and set up a producer of tickers that emits a random price between 0 and 100 for PIP once a second.</p>

<pre><code class="language-js">var db = db.getSisterDB(&quot;finance&quot;);
while(true) {
  db.ticker.insert({
    time: new Date(),
    ticker: &quot;PIP&quot;,
    price: 100 * Math.random(1000)
  })

  sleep(1000);
}
</code></pre>

<p>Let&rsquo;s boot up a consumer for the tickers using a <strong>tailable</strong> cursor.</p>

<pre><code class="language-js">var db = db.getSisterDB(&quot;finance&quot;);
var cursor = db.ticker.find({time: {$gte: new Date()}}).addOption(DBQuery.Option.tailable).addOption(DBQuery.Option.awaitData)

while(cursor.hasNext) {
  print(JSON.stringify(cursor.next(), null, 2))
}
</code></pre>

<p>This consumer will get any ticker prices that are equal to or newer than the current time when it starts up.</p>

<h2 id="time-to-live-indexes-ttl:350c6b49ae8b36a26012060b0876f863">Time To Live Indexes (TTL)</h2>

<p>MongoDB 2.4 or higher has a new type of index called TTL that lets the server expire documents gradually over time. Take the following document.</p>

<pre><code class="language-json">{
  &quot;hello&quot;: &quot;world&quot;,
  &quot;created_on&quot;: ISODate(&quot;2014-01-01T10:01:22Z&quot;)
}
</code></pre>

<p>Say we only want to keep the last 24 hours worth of data. This could be accomplished by performing a bulk remove of document using a batch job, or it can be more elegantly be solved using a TTL index.</p>

<pre><code class="language-js">var db = db.getSisterDB(&quot;data&quot;);
var numberOfSeconds = 60 * 60 * 24; // 60 sec * 60 min * 24 hours
db.expire.ensureIndex({ &quot;created_on&quot;: 1}, {expireAfterSeconds: numberOfSeconds })
</code></pre>

<p>As documents expire against the TTL index they will be removed gradually over time.</p>

<blockquote>
<p>Time To Live Indexes (TTL)</p>

<p>The TTL index is not a hard real-time limit of expiry. It only guarantees that document will be expired some time after it hits the expire threshold but this period will vary depending on the load on MongoDB and other currently running operations.</p>
</blockquote>



                <div class="col-md-1">


                    <a class="navigation next" href="/schema/chapter8">
                        <i class="fa fa-angle-right"> </i>
                    </a>


                </div>
              </div>

          </section>
      </section>

  </section>


    <script src="/node-mongodb-native/js/jquery-2.1.1.min.js"></script>
    <script src="/node-mongodb-native/js/jquery.scrollTo.min.js"></script>
    <script src="/node-mongodb-native/js/bootstrap.min.js"></script>

    <script src="/node-mongodb-native/js/highlight.pack.js"></script>
    <script>hljs.initHighlightingOnLoad();</script>
    <script src="/node-mongodb-native/js/scripts.js"></script>
    <script async defer id="github-bjs" src="/node-mongodb-native/js/buttons.js"></script>
    <script>
  (function(i,s,o,g,r,a,m){i['GoogleAnalyticsObject']=r;i[r]=i[r]||function(){
  (i[r].q=i[r].q||[]).push(arguments)},i[r].l=1*new Date();a=s.createElement(o),
  m=s.getElementsByTagName(o)[0];a.async=1;a.src=g;m.parentNode.insertBefore(a,m)
  })(window,document,'script','//www.google-analytics.com/analytics.js','ga');

  ga('create', 'UA-41953057-1', 'auto');
  ga('send', 'pageview');

</script>
  </body>
</html>
